# -*- coding: utf-8 -*-
from datetime import timedelta
from time import time
from utils.operators.rich_sql_sensor import RichSqlSensor
from jms.dm.dm_aviation_order_detail_dt import jms_dm__dm_aviation_order_detail_dt

label = f"dm_aviation_order_detail_dt_{int(time())}"
timeout = timedelta(hours=1).seconds
doris_jms_dm__dm_aviation_order_detail_dt = RichSqlSensor(
    task_id='doris_jms_dm__dm_aviation_order_detail_dt',
    pool='broker_load_pool',
    email=['matthew.xiong@jtexpress.com', 'yl_bigdata@yl-scm.com'],
    task_concurrency=1,
    conn_id='doris',
    pre_sql=f"""
    TRUNCATE TABLE jms_dm.dm_aviation_order_detail_dt PARTITION (
         p{{{{ execution_date | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-1) |cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-2) |cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-3) |cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-4) |cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-5) |cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-6) |cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-7) |cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-8) |cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-9) |cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-10) |cst_ds_nodash }}}}
    );
    LOAD LABEL jms_dm.{label} (
        DATA INFILE(
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-1) |cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-2) |cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-3) |cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-4) |cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-5) |cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-6) |cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-7) |cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-8) |cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-9) |cst_ds }}}}/*",
          "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_aviation_order_detail_dt/dt={{{{ execution_date | date_add(-10) |cst_ds }}}}/*"
        )   
        INTO TABLE dm_aviation_order_detail_dt
        FORMAT AS 'PARQUET'
    ) 
    WITH BROKER '{{{{ var.json.doris_brokers | random_choice }}}}'
    PROPERTIES ('timeout'='{timeout}', 'max_filter_ratio'='0.0')
    """,
    poke_sql=f"SHOW LOAD FROM jms_dm WHERE label = '{label}' ORDER BY CreateTime DESC LIMIT 1",
    sql_on_kill=f"CANCEL LOAD FROM jms_dm WHERE LABEL = '{label}'",
    success=lambda r: r[2] == 'FINISHED',
    failure=lambda r: (r[2] is not None and r[2] == 'CANCELLED', str(r[7])),
    poke_interval=60,
    execution_timeout=timedelta(seconds=timeout + 120), )

doris_jms_dm__dm_aviation_order_detail_dt << jms_dm__dm_aviation_order_detail_dt
